{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "5d1953c3-0253-4d1b-a630-61e61b61d10d",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "WARNING: An illegal reflective access operation has occurred\n",
      "WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/opt/spark/jars/spark-unsafe_2.12-3.1.2.jar) to constructor java.nio.DirectByteBuffer(long,int)\n",
      "WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform\n",
      "WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations\n",
      "WARNING: All illegal access operations will be denied in a future release\n",
      "22/05/17 06:57:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n",
      "Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties\n",
      "Setting default log level to \"WARN\".\n",
      "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Debug -- spark init\n",
      "Debug -- version: 3.1.2\n",
      "Debug -- applicaitonId: local-1652770633175\n",
      "Debug -- uiWebUrl: http://jupyter.my.nginx.test/hub/user-redirect/proxy/4040/jobs/\n"
     ]
    }
   ],
   "source": [
    "import metaspore as ms\n",
    "\n",
    "spark_confs={\n",
    "    \"spark.network.timeout\":\"500\",\n",
    "    \"spark.ui.showConsoleProgress\": \"true\",\n",
    "    \"spark.kubernetes.executor.deleteOnTermination\":\"true\",\n",
    "}\n",
    "spark = ms.spark.get_session(local=True,\n",
    "                            app_name='ESMM read data',\n",
    "                            batch_size=256,\n",
    "                            worker_count=2,\n",
    "                            server_count=2,\n",
    "                            worker_memory='5G',\n",
    "                            server_memory='5G',\n",
    "                            coordinator_memory='5G',\n",
    "                            spark_confs=spark_confs)\n",
    "sc = spark.sparkContext\n",
    "print('Debug -- spark init')\n",
    "print('Debug -- version:', sc.version)   \n",
    "print('Debug -- applicaitonId:', sc.applicationId)\n",
    "print('Debug -- uiWebUrl:', sc.uiWebUrl)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "6cc83c34-d1e0-419a-9176-63fa9b7154e1",
   "metadata": {},
   "outputs": [],
   "source": [
    "train_path = '${MY_S3_BUCKET}/aliccp/traindata_10w.csv'\n",
    "test_path = '${MY_S3_BUCKET}/aliccp/testdata_10w.csv'\n",
    "\n",
    "train_output_path ='${MY_S3_BUCKET}/aliccp/traindata_10w.parquet'\n",
    "test_output_path = '${MY_S3_BUCKET}/aliccp/testdata_10w.parquet'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "id": "b77270e1-5b02-4617-90eb-5e9a2a27ab44",
   "metadata": {},
   "outputs": [],
   "source": [
    "train_dataset = spark.read.csv(train_path,  sep=',')\n",
    "test_dataset = spark.read.csv(test_path,  sep=',')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "5b171630-457f-4f67-aae0-47351d286e77",
   "metadata": {},
   "outputs": [],
   "source": [
    "from collections import defaultdict\n",
    "\n",
    "all_field_list = [\n",
    "    '101', '109_14', '110_14', '127_14', '150_14', '121', '122', '124',\n",
    "    '125', '126', '127', '128', '129', '205', '206', '207', '210',\n",
    "    '216', '508', '509', '702', '853', '301'\n",
    "]\n",
    "\n",
    "def get_aliccp_fields():\n",
    "    all_field_dict = defaultdict(int)\n",
    "    for i, field_id in enumerate(all_field_list):\n",
    "        all_field_dict[field_id] = i\n",
    "    return all_field_list, all_field_dict\n",
    "\n",
    "def get_aliccp_columns():\n",
    "    return ['label', 'ctr_label', 'cvr_label'] + all_field_list\n",
    "    \n",
    "\n",
    "def transform(row, max_len=10, sep=u'\\u0001', default_padding='-1'):\n",
    "    all_field_list, all_field_dict = get_aliccp_fields()\n",
    "    output_buffer = [(field_id, []) for field_id in all_field_dict]\n",
    "    \n",
    "    ctr_label = 0\n",
    "    ctr_label = 0\n",
    "    for key, value in row.asDict().items():\n",
    "        if key == '_c0': # row number\n",
    "            continue\n",
    "        elif key == '_c1':\n",
    "            ctr_label = int(value)\n",
    "        elif key == '_c2':\n",
    "            cvr_label = int(value)\n",
    "        else:\n",
    "            if value is None or value =='':\n",
    "                continue\n",
    "            else:\n",
    "                field_id, feature_id = value.strip().split(':')\n",
    "                if field_id not in all_field_dict:\n",
    "                    continue\n",
    "                index = all_field_dict[field_id]\n",
    "                output_buffer[index][1].append(int(feature_id))\n",
    "    \n",
    "    output_list=[]\n",
    "    output_list.append(str(ctr_label * cvr_label))\n",
    "    output_list.append(str(ctr_label))\n",
    "    output_list.append(str(cvr_label))\n",
    "    for i in range(len(all_field_list)):\n",
    "        if len(output_buffer[i][1]) == 0:\n",
    "            output_list.append(default_padding)\n",
    "        else:\n",
    "            seqs = output_buffer[i][1]\n",
    "            if len(output_buffer[i][1]) > max_len:\n",
    "                seqs = output_buffer[i][1][:max_len]\n",
    "            output_list.append(sep.join([str(x) for x in seqs]))\n",
    "    return output_list"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "446e27e9-2fe3-4676-a0a0-bf46143de646",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    }
   ],
   "source": [
    "fg_train_dataset = train_dataset.rdd.map(lambda x: transform(x)).toDF(get_aliccp_columns())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "id": "da04e433-fe56-4a38-b0b5-4b2562ea4612",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "22/05/17 06:57:22 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.\n",
      "                                                                                \r"
     ]
    },
    {
     "data": {
      "text/plain": [
       "100001"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "fg_train_dataset.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "id": "7c91b028-ca5a-468c-866c-2c33088fc757",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    }
   ],
   "source": [
    "fg_train_dataset.write.parquet(train_output_path, mode=\"overwrite\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "id": "b57671ea-aed1-43b9-b294-2b92dd89c465",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "[Stage 8:=============================>                             (1 + 1) / 2]\r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----+---------+---------+---+---------------------------------------+---------------------------------------+---------------------------------------+-----------------------------+---+---+---+---+---+---+---+---+---+---+---+---------------------------------------+---+---+---+---+---+---+\n",
      "|label|ctr_label|cvr_label|101|109_14                                 |110_14                                 |127_14                                 |150_14                       |121|122|124|125|126|127|128|129|205|206|207|210                                    |216|508|509|702|853|301|\n",
      "+-----+---------+---------+---+---------------------------------------+---------------------------------------+---------------------------------------+-----------------------------+---+---+---+---+---+---+---+---+---+---+---+---------------------------------------+---+---+---+---+---+---+\n",
      "|0    |0        |0        |12 |418\u0001419\u0001420\u0001421\u0001422\u0001423\u0001424\u0001425\u0001426\u0001427|535\u0001536\u0001537\u0001538\u0001539\u0001540\u0001541\u0001542\u0001543\u0001544|209\u0001210\u0001211\u0001212\u0001213\u0001214\u0001215\u0001216\u0001217\u0001218|18\u000119\u000120\u000121\u000122\u000123\u000124\u000125\u000126\u000127|10 |11 |13 |14 |-1 |15 |16 |17 |6  |7  |8  |1\u00012\u00013                                  |4  |9  |-1 |-1 |-1 |5  |\n",
      "|0    |0        |0        |12 |418\u0001419\u0001420\u0001421\u0001422\u0001423\u0001424\u0001425\u0001426\u0001427|535\u0001536\u0001537\u0001538\u0001539\u0001540\u0001541\u0001542\u0001543\u0001544|209\u0001210\u0001211\u0001212\u0001213\u0001214\u0001215\u0001216\u0001217\u0001218|18\u000119\u000120\u000121\u000122\u000123\u000124\u000125\u000126\u000127|10 |11 |13 |14 |-1 |15 |16 |17 |883|884|885|877\u0001878\u0001879\u0001880\u0001881                    |882|-1 |-1 |-1 |-1 |5  |\n",
      "|0    |1        |0        |12 |418\u0001419\u0001420\u0001421\u0001422\u0001423\u0001424\u0001425\u0001426\u0001427|535\u0001536\u0001537\u0001538\u0001539\u0001540\u0001541\u0001542\u0001543\u0001544|209\u0001210\u0001211\u0001212\u0001213\u0001214\u0001215\u0001216\u0001217\u0001218|18\u000119\u000120\u000121\u000122\u000123\u000124\u000125\u000126\u000127|10 |11 |13 |14 |-1 |15 |16 |17 |894|893|892|886\u0001887\u0001896\u0001897\u0001898\u0001899\u0001900\u0001901\u0001902\u0001903|895|889|888|890|891|5  |\n",
      "+-----+---------+---------+---+---------------------------------------+---------------------------------------+---------------------------------------+-----------------------------+---+---+---+---+---+---+---+---+---+---+---+---------------------------------------+---+---+---+---+---+---+\n",
      "only showing top 3 rows\n",
      "\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    }
   ],
   "source": [
    "fg_train_dataset_load = spark.read.parquet(train_output_path)\n",
    "fg_train_dataset_load[fg_train_dataset_load['101']=='12'].show(3, False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "id": "6bd5d4d8-4456-49d5-a4ee-1832a4d631b6",
   "metadata": {},
   "outputs": [],
   "source": [
    "fg_test_dataset = test_dataset.rdd.map(lambda x: transform(x)).toDF(get_aliccp_columns())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "id": "2ad35702-8de5-4c36-b2cf-1dd949569458",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "data": {
      "text/plain": [
       "100001"
      ]
     },
     "execution_count": 10,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "fg_test_dataset.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "id": "22d93a28-03de-4e24-8e1b-c2e0b79e2a9e",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    }
   ],
   "source": [
    "fg_test_dataset.write.parquet(test_output_path, mode=\"overwrite\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "id": "e96f8504-cdd7-4623-bd16-d975c5de06cd",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "[Stage 15:=============================>                            (1 + 1) / 2]\r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----+---------+---------+---+---------------------------------------------+---------------------------------------+-----------------------------------------------+----------------------------------------+---+---+---+---+---+---+---+---+------+----+------+-----------------------------------------+------+----+----+----+----+---+\n",
      "|label|ctr_label|cvr_label|101|109_14                                       |110_14                                 |127_14                                         |150_14                                  |121|122|124|125|126|127|128|129|205   |206 |207   |210                                      |216   |508 |509 |702 |853 |301|\n",
      "+-----+---------+---------+---+---------------------------------------------+---------------------------------------+-----------------------------------------------+----------------------------------------+---+---+---+---+---+---+---+---+------+----+------+-----------------------------------------+------+----+----+----+----+---+\n",
      "|0    |0        |0        |12 |77517\u0001418\u0001419\u000168464\u0001420\u0001421\u0001422\u000119118\u0001423\u0001424|876\u0001535\u0001536\u0001537\u0001538\u0001539\u0001540\u0001541\u0001542\u0001543|209\u0001210\u000168326\u0001211\u0001530271\u0001212\u0001213385\u0001213\u0001214\u0001215|18\u000119\u000120\u000121\u000126\u000128\u0001530261\u000131\u0001179382\u000111214|10 |11 |13 |14 |-1 |15 |16 |17 |1743  |1348|1492  |1742\u0001411311\u00011741\u00011736\u00011731\u00011739          |1331  |1349|1490|1332|-1  |5  |\n",
      "|0    |0        |0        |12 |77517\u0001418\u0001419\u000168464\u0001420\u0001421\u0001422\u000119118\u0001423\u0001424|876\u0001535\u0001536\u0001537\u0001538\u0001539\u0001540\u0001541\u0001542\u0001543|209\u0001210\u000168326\u0001211\u0001530271\u0001212\u0001213385\u0001213\u0001214\u0001215|18\u000119\u000120\u000121\u000126\u000128\u0001530261\u000131\u0001179382\u000111214|10 |11 |13 |14 |-1 |15 |16 |17 |1293  |1301|1292  |1297\u00011361\u00011296\u000117177\u00011295\u0001159678         |1294  |1299|1298|1300|1373|5  |\n",
      "|0    |0        |0        |12 |77517\u0001418\u0001419\u000168464\u0001420\u0001421\u0001422\u000119118\u0001423\u0001424|876\u0001535\u0001536\u0001537\u0001538\u0001539\u0001540\u0001541\u0001542\u0001543|209\u0001210\u000168326\u0001211\u0001530271\u0001212\u0001213385\u0001213\u0001214\u0001215|18\u000119\u000120\u000121\u000126\u000128\u0001530261\u000131\u0001179382\u000111214|10 |11 |13 |14 |-1 |15 |16 |17 |260277|1050|179485|63777\u000193435\u0001184105\u00011690\u000163785\u000163784\u000163783|179482|-1  |-1  |-1  |-1  |5  |\n",
      "+-----+---------+---------+---+---------------------------------------------+---------------------------------------+-----------------------------------------------+----------------------------------------+---+---+---+---+---+---+---+---+------+----+------+-----------------------------------------+------+----+----+----+----+---+\n",
      "only showing top 3 rows\n",
      "\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    }
   ],
   "source": [
    "fg_test_dataset_load = spark.read.parquet(test_output_path)\n",
    "fg_test_dataset_load[fg_test_dataset_load['101']=='12'].show(3, False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "id": "16d95643-2298-4ae3-b125-01130900024d",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2022-05-17 06:58:25          0 _SUCCESS\n",
      "2022-05-17 06:58:15     947384 part-00000-d6feb948-d17c-41e3-9f79-8f74780ed618-c000.snappy.parquet\n",
      "2022-05-17 06:58:16    1079777 part-00001-d6feb948-d17c-41e3-9f79-8f74780ed618-c000.snappy.parquet\n",
      "2022-05-17 06:58:16    1069582 part-00002-d6feb948-d17c-41e3-9f79-8f74780ed618-c000.snappy.parquet\n",
      "2022-05-17 06:58:15     927070 part-00003-d6feb948-d17c-41e3-9f79-8f74780ed618-c000.snappy.parquet\n",
      "2022-05-17 06:58:24    1065867 part-00004-d6feb948-d17c-41e3-9f79-8f74780ed618-c000.snappy.parquet\n",
      "2022-05-17 06:58:22     607252 part-00005-d6feb948-d17c-41e3-9f79-8f74780ed618-c000.snappy.parquet\n"
     ]
    }
   ],
   "source": [
    "!aws s3 ls ${MY_S3_BUCKET}/aliccp/traindata_10w.parquet/"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "id": "efd34f0c-d0be-415b-8cb2-1f64d81ebc29",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2022-05-17 06:59:34          0 _SUCCESS\n",
      "2022-05-17 06:59:24     981117 part-00000-b93ffec3-8b57-4ba2-b06f-626fedbaea29-c000.snappy.parquet\n",
      "2022-05-17 06:59:24    1113958 part-00001-b93ffec3-8b57-4ba2-b06f-626fedbaea29-c000.snappy.parquet\n",
      "2022-05-17 06:59:24    1120122 part-00002-b93ffec3-8b57-4ba2-b06f-626fedbaea29-c000.snappy.parquet\n",
      "2022-05-17 06:59:23     970861 part-00003-b93ffec3-8b57-4ba2-b06f-626fedbaea29-c000.snappy.parquet\n",
      "2022-05-17 06:59:33    1103592 part-00004-b93ffec3-8b57-4ba2-b06f-626fedbaea29-c000.snappy.parquet\n",
      "2022-05-17 06:59:30     615321 part-00005-b93ffec3-8b57-4ba2-b06f-626fedbaea29-c000.snappy.parquet\n"
     ]
    }
   ],
   "source": [
    "!aws s3 ls ${MY_S3_BUCKET}/aliccp/testdata_10w.parquet/"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "c597f3cd-5e71-4ceb-9c73-2ad2386eb5da",
   "metadata": {},
   "outputs": [],
   "source": [
    "spark.stop()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.10"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
